-
Notifications
You must be signed in to change notification settings - Fork 28.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-33808][SQL] DataSource V2: Build logical writes in the optimizer #30806
Conversation
I added @rdblue as a co-author of the change as he addressed the initial comments to my PR and I included his changes. |
@@ -94,7 +96,8 @@ case class OverwriteByExpression( | |||
deleteExpr: Expression, | |||
query: LogicalPlan, | |||
writeOptions: Map[String, String], | |||
isByName: Boolean) extends V2WriteCommand { | |||
isByName: Boolean, | |||
write: Option[Write] = None) extends V2WriteCommand { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Making this optional allows us to reuse the same plan before we construct a write and after. Having None
here means the logical write hasn't been constructed yet. This allows us to have idempotent rules in the optimizer.
AppendDataExecV1(v1, writeOptions.asOptions, query, refreshCache(r)) :: Nil | ||
AppendDataExecV1( | ||
v1, writeOptions.asOptions, query, | ||
refreshCache(r), write.map(_.asInstanceOf[V1Write])) :: Nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there is one open point we need to discuss: do we want to always apply the new logic or should we expose a feature flag and construct logical writes only if the flag is enabled? I'd vote for always constructing writes using the new logic as it feels quite same and it does not have the burden of maintaining one more config. In addition, this will allow us to simply this PR a bit and get rid of optional writes in exec nodes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd be curious about what everybody thinks here. I will be okay either way.
|
||
override protected def run(): Seq[InternalRow] = { | ||
writeWithV1(newWriteBuilder().buildForV1Write(), refreshCache = refreshCache) | ||
override protected def buildAndRun(): Seq[InternalRow] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can get rid of all buildAndRun
methods if we are OK to apply the new logic all the time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm interested to hear what @dongjoon-hyun thinks about this.
I think we should have a different physical node for each write so that the explain plan shows what is happening. Otherwise, the approach to support building the batch write here or building it in the optimizer was mainly to be able to turn this on and off in our environment. I doubt that is needed in other situations.
I think I would be for removing all of the buildAndRun
methods and always building the write in the optimizer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it cause many code change on top of this? If it is not intrusive, it sounds reasonable in that context. I'd give +1 for the direction, @rdblue .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Getting rid of buildAndRun
would also ensure we don't have to maintain the same logic in two places.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm also +1 on getting rid of buildAndRun
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It sounds like an intermediate consensus, I'll update the PR but we can revisit this once we have more input from others.
@@ -89,6 +90,21 @@ sealed trait V1FallbackWriters extends V2CommandExec with SupportsV1Write { | |||
|
|||
def table: SupportsWrite | |||
def writeOptions: CaseInsensitiveStringMap | |||
def refreshCache: () => Unit | |||
def write: Option[V1Write] = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here: Option[V1Write]
can become just V1Write
if we are OK to apply the new logic all the time.
val session = sqlContext.sparkSession | ||
// The `plan` is already optimized, we should not analyze and optimize it again. | ||
relation.insert(AlreadyOptimized.dataFrame(session, plan), overwrite = false) | ||
refreshCache() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refresh moved to run
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @sunchao
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
Test build #132896 has finished for PR 30806 at commit
|
Hm, the failure is a bit weird and does not seem related.
|
@@ -39,6 +39,9 @@ class SparkOptimizer( | |||
// TODO: move SchemaPruning into catalyst | |||
SchemaPruning :: V2ScanRelationPushDown :: PruneFileSourcePartitions :: Nil | |||
|
|||
override def dataSourceRewriteRules: Seq[Rule[LogicalPlan]] = | |||
V2Writes :: Nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would probably put this in the early pushdown batch, even though the name doesn't match. The rewrite batch needs to run before this so that writes created by it run V2Writes
afterward. That's the same reason why early pushdown runs after plan rewrites.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, it makes sense after thinking more about it.
Kubernetes integration test starting |
Kubernetes integration test status success |
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #132900 has finished for PR 30806 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good overall. Minor suggestions and nits.
@@ -188,15 +189,20 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat | |||
orCreate = orCreate) :: Nil | |||
} | |||
|
|||
case AppendData(r: DataSourceV2Relation, query, writeOptions, _) => | |||
case AppendData(r: DataSourceV2Relation, query, writeOptions, _, write) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is write
guaranteed not be None
?
How about rewriting this case as follows?
case AppendData(r @ DataSourceV2Relation(v1: SupportsWrite, _, _, _, _), query, writeOptions,
_, Some(v1Write: V1Write)) if v1.supports(TableCapability.V1_BATCH_WRITE) =>
AppendDataExecV1(v1, writeOptions.asOptions, query, refreshCache(r), v1Write) :: Nil
case AppendData(r @ DataSourceV2Relation(v2: SupportsWrite, _, _, _, _),
query, writeOptions, _, Some(write)) =>
AppendDataExec(v2, writeOptions.asOptions, planLater(query), refreshCache(r), write) :: Nil
It is not exactly the same as the existing code. Some unmatched cases (not sure how many or if any) will fall through. Exception will be thrown later, instead of right here upon instance cast or Option.get.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 to this idea. It's guaranteed that the write
will be Some
not None
at the planner, so matching Some(write)
is better.
It's possible that the implementation declares V1_BATCH_WRITE
but doesn't return V1Write
. We should give clear error message if it happens:
case AppendData(r @ DataSourceV2Relation(v1: SupportsWrite, _, _, _, _), query, writeOptions,
_, Some(write)) if v1.supports(TableCapability.V1_BATCH_WRITE) =>
if (!write.isInstanceOf[V1Write]) throw ...
...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea to add more meaningful exception here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've updated this place. Could you take a look, @jzhuge and @cloud-fan?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great! Thanks for taking care of Overwrite*
cases as well.
AppendDataExecV1(v1, writeOptions.asOptions, query, refreshCache(r)) :: Nil | ||
AppendDataExecV1( | ||
v1, writeOptions.asOptions, query, | ||
refreshCache(r), write.get.asInstanceOf[V1Write]) :: Nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Possible to avoid instance cast? See my suggestion above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
AppendDataExec(v2, writeOptions.asOptions, planLater(query), refreshCache(r)) :: Nil | ||
AppendDataExec( | ||
v2, writeOptions.asOptions, planLater(query), | ||
refreshCache(r), write.get) :: Nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Possible to avoid Option.get? See my suggestion above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got rid of it.
protected def writeWithV1( | ||
relation: InsertableRelation, | ||
refreshCache: () => Unit = () => ()): Seq[InternalRow] = { | ||
protected def writeWithV1(relation: InsertableRelation): Seq[InternalRow] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nicely simplified
case v1: V1WriteBuilder => writeWithV1(v1.buildForV1Write()) | ||
case v2 => writeWithV2(v2.buildForBatch()) | ||
val write = writeBuilder.build() | ||
val writtenRows = write match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: merge line 451-454 into:
val writtenRows = table.newWriteBuilder(info).build() match {
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't feel strongly about this place and can update it. However, I do prefer to split different logical parts into different variables. Here, I've separated building a logical write from actually writing the records. Let me know what are your thoughts, @jzhuge.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fine with me
@@ -33,6 +33,11 @@ | |||
*/ | |||
@Unstable | |||
public interface V1WriteBuilder extends WriteBuilder { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an unstable API, can we just remove it and only use V1Write
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that should simplify the V1 fallback API. I'll update it, @cloud-fan.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got rid of V1WriteBuilder
and tried to update docs too. Let me know if I missed places, @cloud-fan.
} | ||
|
||
case OverwriteByExpression(r: DataSourceV2Relation, deleteExpr, query, writeOptions, _) => | ||
// fail if any filter cannot be converted. correctness depends on removing all matching data. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed the filter conversion as it is done earlier now.
|
||
case o @ OverwriteByExpression(r: DataSourceV2Relation, deleteExpr, query, options, _, None) => | ||
// fail if any filter cannot be converted. correctness depends on removing all matching data. | ||
val filters = splitConjunctivePredicates(deleteExpr).flatMap { pred => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I kept the old logic but I am not sure whether we should also normalize filters. Thoughts, @cloud-fan @rdblue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should, to follow DS v1 and file source.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've created SPARK-33868 as a follow-up item. I will keep the old behavior in this PR.
Test build #132965 has finished for PR 30806 at commit
|
Kubernetes integration test starting |
Kubernetes integration test status success |
Retest this, please. |
Test build #133150 has finished for PR 30806 at commit
|
Kubernetes integration test starting |
Kubernetes integration test status failure |
Lead-authored-by: Anton Okolnychyi <aokolnychyi@apple.com> Co-authored-by: Ryan Blue <blue@apache.org>
e6335c0
to
84241e0
Compare
I think certain checks are expected to fail:
Per discussion here. |
Test build #133160 has finished for PR 30806 at commit
|
Kubernetes integration test starting |
case v2Write => | ||
throw new AnalysisException( | ||
s"Table ${v1.name} declares ${TableCapability.V1_BATCH_WRITE} capability but " + | ||
s"${v2Write.getClass} is not an instance of ${classOf[V1Write]}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
classOf[V1Write].getName
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
@@ -65,7 +66,8 @@ case class AppendData( | |||
table: NamedRelation, | |||
query: LogicalPlan, | |||
writeOptions: Map[String, String], | |||
isByName: Boolean) extends V2WriteCommand { | |||
isByName: Boolean, | |||
write: Option[Write] = None) extends V2WriteCommand { | |||
override def withNewQuery(newQuery: LogicalPlan): AppendData = copy(query = newQuery) | |||
override def withNewTable(newTable: NamedRelation): AppendData = copy(table = newTable) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we add override lazy val resolved = ... && write.isDefined
in V2WriteCommand
? It's safer to make sure that the analyzer creates the Write
object.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds like a good idea but we actually construct the Write
object in the optimizer after the operator optimization is done to ensure we operate on optimal expressions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah I see, let's leave it then.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
We need to update |
Kubernetes integration test status success |
@@ -132,7 +135,8 @@ case class OverwritePartitionsDynamic( | |||
table: NamedRelation, | |||
query: LogicalPlan, | |||
writeOptions: Map[String, String], | |||
isByName: Boolean) extends V2WriteCommand { | |||
isByName: Boolean, | |||
write: Option[Write] = None) extends V2WriteCommand { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not related to this PR. I'm thinking that if we should have an optional Scan
object in DataSourceV2Relation
, instead of having a new logical plan DataSourceV2ScanRelation
. It's simpler and consistent with the write logical plans. cc @rdblue
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that's a good idea.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Quick question: DataSourceV2Relation
is also used inside write nodes like AppendData
. If we add an optional scan, will that mean we will leak a read-specific concept into write plans?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For AppendData
, we intentionally do not treat the table
as a child, which means the pushdown rule won't apply for it and the Scan
object will always be None in AppendData
.
Test build #133168 has finished for PR 30806 at commit
|
Retest this please. |
Kubernetes integration test starting |
Kubernetes integration test status success |
Refer to this link for build results (access rights to CI server needed): |
Test build #133186 has finished for PR 30806 at commit
|
Refer to this link for build results (access rights to CI server needed): |
thanks, merging to master! |
Thanks @cloud-fan @jzhuge @rdblue @sunchao @dongjoon-hyun! |
What changes were proposed in this pull request?
This PR adds logic to build logical writes introduced in SPARK-33779.
Note: This PR contains a subset of changes discussed in PR #29066.
Why are the changes needed?
These changes are the next step as discussed in the design doc for SPARK-23889.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Existing tests.